package com.atuguigu.flink.Day01.WordCount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

//Flink入门WordCount案例
public class WordCount {
    public static void main(String[] args) throws Exception {
        //TODO 1 获取上下文环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        env.setParallelism(1);

        //TODO 2 获取数据源
        DataStreamSource<String> stream = env.fromElements("hello flink", "hello flink");
        //进行map操作,将数据变成为(hello,1),(flink,1)
        //flatmap针对每一个元素，生成0和1多个元素
        SingleOutputStreamOperator<WordWithCount> mapperStream = stream.flatMap(
                new Tonkenizer()
        );

        //TODO 3 shuffle阶段,进行分组
        KeyedStream<WordWithCount, String> KeyedStream = mapperStream.keyBy(
                r -> r.word
        );


        //TODO 4 Reduce阶段,将数据变为（Word,n）

        SingleOutputStreamOperator<WordWithCount> reduce = KeyedStream.reduce(
                new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount t1, WordWithCount t2) throws Exception {
                        return new WordWithCount(t1.word, t1.count + t2.count);
                    }
                }
        );

        reduce.print();

        //执行
        env.execute();
    }
    public static class Tonkenizer implements FlatMapFunction<String,WordWithCount> {

        @Override
        public void flatMap(String s, Collector<WordWithCount> collector) throws Exception {
            //将数据通过切分
            String[] arr = s.split(" ");
            //循环将数据变为(word,1)
            for(String element:arr){
                collector.collect(new WordWithCount(element,1L));
            }
        }
    }
    //POJI类
    public static class WordWithCount{
        public  String word;
        public  Long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, Long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}
